@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 *
 * It can be demonstrated with the following example:
 * ```
 * val flow = flowOf(1, 2).onEach { delay(10) }
 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
 * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
 *     println(it) // Will print "1a 2a 2b 2c"
 * }
 * ```
 *
 * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
 */
@JvmName("flowCombine")
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
    combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
}

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 *
 * It can be demonstrated with the following example:
 * ```
 * val flow = flowOf(1, 2).onEach { delay(10) }
 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
 * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
 *     println(it) // Will print "1a 2a 2b 2c"
 * }
 * ```
 *
 * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
 */
public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
    flow.combine(flow2, transform)

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 *
 * Its usage can be demonstrated with the following example:
 * ```
 * val flow = requestFlow()
 * val flow2 = searchEngineFlow()
 * flow.combineTransform(flow2) { request, searchEngine ->
 *     emit("Downloading in progress")
 *     val result = download(request, searchEngine)
 *     emit(result)
 * }
 * ```
 */
@JvmName("flowCombineTransform")
public fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2
    )
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 *
 * Its usage can be demonstrated with the following example:
 * ```
 * val flow = requestFlow()
 * val flow2 = searchEngineFlow()
 * combineTransform(flow, flow2) { request, searchEngine ->
 *     emit("Downloading in progress")
 *     val result = download(request, searchEngine)
 *     emit(result)
 * }
 * ```
 */
public fun <T1, T2, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2
    )
}

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 */
public fun <T1, T2, T3, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    @BuilderInference transform: suspend (T1, T2, T3) -> R
): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3
    )
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 */
public fun <T1, T2, T3, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3
    )
}

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 */
public fun <T1, T2, T3, T4, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    transform: suspend (T1, T2, T3, T4) -> R
): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3,
        args[3] as T4
    )
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 */
public fun <T1, T2, T3, T4, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3,
        args[3] as T4
    )
}

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 */
public fun <T1, T2, T3, T4, T5, R> combine(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    flow5: Flow<T5>,
    transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3,
        args[3] as T4,
        args[4] as T5
    )
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 */
public fun <T1, T2, T3, T4, T5, R> combineTransform(
    flow: Flow<T1>,
    flow2: Flow<T2>,
    flow3: Flow<T3>,
    flow4: Flow<T4>,
    flow5: Flow<T5>,
    @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
    transform(
        args[0] as T1,
        args[1] as T2,
        args[2] as T3,
        args[3] as T4,
        args[4] as T5
    )
}

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 */
public inline fun <reified T, R> combine(
    vararg flows: Flow<T>,
    crossinline transform: suspend (Array<T>) -> R
): Flow<R> = flow {
    combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 */
public inline fun <reified T, R> combineTransform(
    vararg flows: Flow<T>,
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> = safeFlow {
    combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
}

/*
 * Same as combine, but does not copy array each time, deconstructing existing
 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
 */
private inline fun <reified T, R> combineUnsafe(
    vararg flows: Flow<T>,
    crossinline transform: suspend (Array<T>) -> R
): Flow<R> = flow {
    combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
}

/*
 * Same as combineTransform, but does not copy array each time, deconstructing existing
 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
 */
private inline fun <reified T, R> combineTransformUnsafe(
    vararg flows: Flow<T>,
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> = safeFlow {
    combineInternal(flows, nullArrayFactory(), { transform(it) })
}

// Saves bunch of anonymous classes
private fun <T> nullArrayFactory(): () -> Array<T>? = { null }

/**
 * Returns a [Flow] whose values are generated with [transform] function by combining
 * the most recently emitted values by each flow.
 */
public inline fun <reified T, R> combine(
    flows: Iterable<Flow<T>>,
    crossinline transform: suspend (Array<T>) -> R
): Flow<R> {
    val flowArray = flows.toList().toTypedArray()
    return flow {
        combineInternal(
            flowArray,
            arrayFactory = { arrayOfNulls(flowArray.size) },
            transform = { emit(transform(it)) })
    }
}

/**
 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
 *
 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
 * generic function that may transform emitted element, skip it or emit it multiple times.
 */
public inline fun <reified T, R> combineTransform(
    flows: Iterable<Flow<T>>,
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
    val flowArray = flows.toList().toTypedArray()
    return safeFlow {
        combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
    }
}

/**
 * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
 * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
 *
 * It can be demonstrated with the following example:
 * ```
 * val flow = flowOf(1, 2, 3).onEach { delay(10) }
 * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
 * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
 *     println(it) // Will print "1a 2b 3c"
 * }
 * ```
 *
 * ### Buffering
 *
 * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
 * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
 * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
 */
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
